Socket异步编程之 Async 模式(一)

作者:陈广
日期:2018-3-29


之前讲的 Begin/End 模式是 .NET Framework 2.0 时候出的基于 APM 的 Socket 异步编程模型。之后的 .NET Framework 3.5 出现了新的 Socket 编程模型。由BeginXXX/EndXXX对变为XXXAsync,使用超级复杂,当然对于高并发来说,性能肯定提升较高。现在 .NET Core 都出来了,也没看见有新的 Socket 编程模型。所以,这算是最新的了吧,重点讲解。郁闷的是微软官网网络编程文档使用的还是 Begin/End 模型。新模型仅有一个示例代码,只能上网找其它资料,关键是网上的资料孰优孰劣,一时很难辨别,只能硬着头皮上了。

新模型使用的是SocketAsyncEventArgs,此类专为高性能网络服务器应用程序而设计。它是 Begin/End 模式的一个增强版本,它的主要作用主要是解决之前异步过程时创建不可复用的异步对象而产生的。主要是在高并发下节省大量对象重分配和同步相关问题,从而实现在高并发吞吐下更少的资源损耗。SocketAsyncEventArgs实现了服务器网络编程中最先进的IOCP模型(Input Output Completion POrt,又称I/O完成端口),关于 IOCP,大家可以上网进行了解。由于是专为服务器而设计的,我就主要拿它写服务器程序吧。为省事,部分客户端还是用回原来的模型。

建立连接

先来看看SocketAsyncEventArgs是怎么个玩法。

单次连接

先来服务端程序,在 Visual Studio 中新建一个控制台应用程序,使用如下命名空间:

using System;
using System.Net;
using System.Net.Sockets;

代码如下:

static void Main(string[] args)
{
    IPAddress ip = IPAddress.Parse("127.0.0.1");
    IPEndPoint point = new IPEndPoint(ip, 5000);
    Socket listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
    try
    {
        listener.Bind(point);
        listener.Listen(100);
        SocketAsyncEventArgs acceptEventArg = new SocketAsyncEventArgs();
        acceptEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(OnAccept);//将OnAccept方法加入acceptEventArg事件

        Console.WriteLine("开始侦听...");
        if(!listener.AcceptAsync(acceptEventArg))//异步侦听连接,如果返回true,则异步调用OnAccept
        {   //如果返回false,则同步调用OnAccept
            OnAccept(null, acceptEventArg);
        }
    }
    catch (Exception e)
    {
        Console.WriteLine(e.Message);
    }
    Console.ReadLine();
}
//收到连接后触发的接收事件原型
static void OnAccept(object sender, SocketAsyncEventArgs e)
{
    Socket handler = e.AcceptSocket; //专为新连接创建的socket
    Console.WriteLine($"侦听到来自{handler.RemoteEndPoint.ToString()}的连接请求");
}

所有对 Windows 应用程序编程熟悉的同学都不会对 EventArgs 这个单词感到陌生,所有的事件里的参数 e 的数据类型都是以它结尾的。SocketAsyncEventArgs类正是用于事件,也就是说最新的网络编程模型是基于事件的。

侦听使用的是AcceptAsync()方法,它接收一个SocketAsyncEventArgs参数,而事件方法(OnAccept)就包装在这个参数内,在收到一个连接后会触发这个事件方法(OnAccept),而为新连接创建的新 Socket 则被包装在OnAccept的参数e之内。之前的 Begin/end 模型中,我们通过IAsyncResult来传递状态和 Socket。同样地,这里通过SocketAsyncEventArgs来进行传递,只是SocketAsyncEventArgs传递的东西更多而已。

客户端程序来个之前使用过的最简单的程序:

static void Main(string[] args)
{
    IPAddress ip = IPAddress.Parse("127.0.0.1");
    try
    {
        Socket s = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
        s.Connect(ip, 5000); //向服务器发起连接
        Console.WriteLine("开始连接服务器 {0} ...", ip.ToString());
        if (s.Connected)
        {
            Console.WriteLine("连接成功!");
        }
    }
    catch (Exception e)
    {
        Console.WriteLine(e.Message);
    }
    Console.ReadLine();
}

运行结果:

不错,有了一个好的开始,现在我们了解了事件机制。其实看着挺顺眼,比委托舒服。大家可能觉得这也不难嘛。别急,事情没这么简单。

如何接收多个连接?是不是如之前一样在AcceptAsync()外面加个While(true)循环调用?游戏不是这么玩的!之前的 Begin/End 模式使用While(true)导致频繁创建IAsyncResult,在高并发访问时,增加了垃圾回收的压力。现在的这套玩法是共用一个SocketAsyncEventArgs对象,改为垃圾回收再利用。再利用听起来很美,但实现起来就要多费些周折了。

多客户端连接

首先,要实现再利用,必须在用完e.AcceptSocket后将其值设置为null。然后还要把AcceptAsync包装在一个方法内,以实现在其它地方再调用。

更改服务器代码如下:

static Socket listener;
static void Main(string[] args)
{
    IPAddress ip = IPAddress.Parse("127.0.0.1");
    IPEndPoint point = new IPEndPoint(ip, 5000);
    listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
    try
    {
        listener.Bind(point);
        listener.Listen(100);
        SocketAsyncEventArgs acceptEventArg = new SocketAsyncEventArgs();
        acceptEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(OnAccept);
        StartAccept(acceptEventArg);//开始第一个侦听周期
        Console.WriteLine("开始侦听...");

    }
    catch (Exception e)
    {
        Console.WriteLine(e.Message);
    }
    Console.ReadLine();
}
//开始侦听,将AcceptAsync包装在此方法内
static void StartAccept(SocketAsyncEventArgs e)
{
    if (!listener.AcceptAsync(e))//异步侦听连接
    {
        OnAccept(null, e);
    }
}
//收到连接后触发的事件
static void OnAccept(object sender, SocketAsyncEventArgs e)
{
    Socket handler = e.AcceptSocket;
    e.AcceptSocket = null;//为重复利用e,必须使用此句代码
    Console.WriteLine($"侦听到来自{handler.RemoteEndPoint.ToString()}的连接请求");
    StartAccept(e);//进入下一个侦听周期,注意,参数e是上一个StartAccept传递过来的
}

更改客户端代码如下:

static void Main(string[] args)
{
    IPAddress ip = IPAddress.Parse("127.0.0.1");
    try
    {
        Socket s;
        //向服务端发送99个连接
        for (int i = 1; i < 100; i++)
        {
            s = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
            s.Connect(ip, 5000); //向服务器发起连接
            Console.WriteLine("开始连接服务器 {0} ...", ip.ToString());
            if (s.Connected)
            {
                Console.WriteLine("连接成功!");
            }
        }
    }
    catch (Exception e)
    {
        Console.WriteLine(e.Message);
    }
    Console.ReadLine();
}

客户端开了99条向服务发起请求的连接。

运行结果:

下图展示了AcceptAsync运行机制

这次把AcceptAsync包装在StartAccept方法中,并把SocketAsyncEventArgs作为参数进行传递;在收到连接请求后,AcceptAsync触发OnAccept并将参数传递进去;OnAccept处理完连接后,又将收到的参数传递给下一个StartAccept。从而完成了SocketAsyncEventArgs参数循环再利用。需要注意的是,在OnAccept,必须要把SocketAsyncEventArgs.AcceptSocket设置为null,才能再次使用它。

数据的接收

数据的读取和侦听连接的机制基本相同,不同之处仅在于不再需要将e.AcceptSocket的值设置为null

服务器程序

更改服务器代码如下:

static Socket listener;
static void Main(string[] args)
{
    IPAddress ip = IPAddress.Parse("127.0.0.1");
    IPEndPoint point = new IPEndPoint(ip, 5000);
    listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
    try
    {
        listener.Bind(point);
        listener.Listen(100);
        SocketAsyncEventArgs acceptEventArg = new SocketAsyncEventArgs();
        acceptEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(OnAccept);
        StartAccept(acceptEventArg);//开始第一个侦听周期
        Console.WriteLine("开始侦听...");

    }
    catch (Exception e)
    {
        Console.WriteLine(e.Message);
    }
    Console.ReadLine();
}
//开始接收,将AcceptAsync包装在此方法内
static void StartAccept(SocketAsyncEventArgs e)
{
    if (!listener.AcceptAsync(e))//异步侦听连接
    {
        OnAccept(null, e);
    }
}
//收到连接后触发的事件
static void OnAccept(object sender, SocketAsyncEventArgs e)
{
    Socket handler = e.AcceptSocket;
    Console.WriteLine($"侦听到来自{handler.RemoteEndPoint.ToString()}的连接请求");

    //为新连接开启一个异步读取消息进程
    SocketAsyncEventArgs receiveArg = new SocketAsyncEventArgs();//专为读取消息新建一个SocketAsyncEventArgs
    receiveArg.Completed += new EventHandler<SocketAsyncEventArgs>(OnRead);//加入读取事件
    receiveArg.SetBuffer(new byte[1024], 0, 1024);//设置读取缓存
    receiveArg.AcceptSocket = e.AcceptSocket;//此处的AcceptSocket参数仅用于传递读取Socket
    StartRead(receiveArg);

    e.AcceptSocket = null;//为重复利用e,必须使用此句代码
    StartAccept(e);//进入下一个侦听周期,注意,参数e是上一个StartAccept传递过来的
}
//开始读取消息,将ReceiveAsync包装在此方法内
static void StartRead(SocketAsyncEventArgs e)
{
    if(!e.AcceptSocket.ReceiveAsync(e))//返回true,则会触发OnRead事件进行异步读取
    {   //返回false则同步读取
        OnRead(null, e);
    }
}
//收到消息后触发的事件
static void OnRead(object sender,SocketAsyncEventArgs e)
{
    if(e.BytesTransferred>0 && e.SocketError==SocketError.Success)
    {
        string sendStr = Encoding.Unicode.GetString(e.Buffer, e.Offset, e.BytesTransferred);
        Console.WriteLine($"收到来自{e.AcceptSocket.RemoteEndPoint.ToString()}的信息:{sendStr}");
        //进入下一个读取周期,参数e被循环利用
        StartRead(e);
    }
}

读取消息的机制如下图所示:

在侦听 Socket 收到一个新连接后,立即为此新连接创建读取专用的SocketAsyncEventArgs,然后设置读取事件,缓存,并利用AcceptSocket属性传递此连接专用的 Socket,然后调用ReceiveAsync方法异步等待读取信息。当完成一个读取后,会触发OnRead事件对接收到的消息进行处理。处理完毕则再次调用ReceiveAsync方法进入下一个等待读取周期。

客户端程序

这次我们使用 Task 来创建线程,引入如下命名空间:

using System;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading.Tasks;
using System.Threading;

代码如下:

static void Main(string[] args)
{
    IPAddress ip = IPAddress.Parse("127.0.0.1");
    try
    {
        //向服务端发送99个连接
        for (int i = 1; i < 100; i++)
        {
            Socket s = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
            s.Connect(ip, 5000); //向服务器发起连接
            Console.WriteLine("开始连接服务器 {0} ...", ip.ToString());
            if (s.Connected)
            {
                Console.WriteLine("连接成功!");
            }
            //使用Task让多个线程同时向服务发送消息
            Task.Run(() => SendMessage(s));
        }
    }
    catch (Exception e)
    {
        Console.WriteLine(e.Message);
    }
    Console.ReadLine();
}
//每个连接发送100条消息
static void SendMessage(Socket s)
{
    for (int j = 0; j < 100; j++)
    {
        byte[] sendBuff = Encoding.Unicode.GetBytes($"消息{j}");
        s.Send(sendBuff, sendBuff.Length, SocketFlags.None);
        Thread.Sleep(100);
    }
    s.Shutdown(SocketShutdown.Both);
    s.Close();
}

之前的文章我们从来没有用过s.Shutdown,这是微软官方推荐关闭 Socket 的正确方式。先 Shutdown 关闭连接,再 Close 套接字。

这次我们创建了99个 Socket 同时向服务器发送消息,每 Socket 隔0.1秒发送一条消息,共发送100条消息。

运行结果:

有意思的是,如果使用 Thread 而不是 Task 创建线程,你会发现消息的发送速度会快很多,使用 Task 甚至慢到不会出现粘包现象。而使用 Thread 则依然存在少数粘包现象。编写服务器程序,Task 肯定是首先,但如果仅仅编写客户端,而且线程不多,那么就需要根据实际情况来确定使用哪个机制了。

服务器关闭连接

上述程序还有一个比较严重的问题,服务器未对 Socket 有可能出现的错误进行相应处理,如果客户端中途退出,那么为此客户端创建的100个 Socket 还是存在于服务器,并没有关闭,所以还需要继续修改代码。

将服务器的OnRead方法代码更改如下:

Socket s = e.AcceptSocket;
if(e.BytesTransferred>0 && e.SocketError==SocketError.Success)
{
    string sendStr = Encoding.Unicode.GetString(e.Buffer, e.Offset, e.BytesTransferred);
    Console.WriteLine($"收到来自{s.RemoteEndPoint.ToString()}的信息:{sendStr}");
    //进入下一个读取周期,参数e被循环利用
    StartRead(e);
}
else
{
    string epStr = s.RemoteEndPoint.ToString();
    try
    {
        s.Shutdown(SocketShutdown.Both);
    }
    catch(Exception ex)
    {       
        Console.WriteLine(ex.Message);
    }
    s.Close();
    Console.WriteLine($"已关闭{epStr}连接");
}

再次运行程序,客户端在运行期间中途关闭,可以看到,100个连接同时被关闭。

数据的发送

数据的发送机制与数据接收类似,但又有所不同。接收是被动的,需要侦听等待,而发送是主动的。

服务器程序

修改服务器代码如下:

static Socket listener;
static void Main(string[] args)
{
    IPAddress ip = IPAddress.Parse("127.0.0.1");
    IPEndPoint point = new IPEndPoint(ip, 5000);
    listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
    try
    {
        listener.Bind(point);
        listener.Listen(100);
        SocketAsyncEventArgs acceptEventArg = new SocketAsyncEventArgs();
        acceptEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(OnAccept);
        StartAccept(acceptEventArg);//开始第一个侦听周期
        Console.WriteLine("开始侦听...");

    }
    catch (Exception e)
    {
        Console.WriteLine(e.Message);
    }
    Console.ReadLine();
}
//开始接收,将AcceptAsync包装在此方法内
static void StartAccept(SocketAsyncEventArgs e)
{
    if (!listener.AcceptAsync(e))//异步侦听连接
    {
        OnAccept(null, e);
    }
}
//收到连接后触发的事件
static void OnAccept(object sender, SocketAsyncEventArgs e)
{
    Socket handler = e.AcceptSocket;
    Console.WriteLine($"侦听到来自{handler.RemoteEndPoint.ToString()}的连接请求");

    //为新连接开启一个异步发送消息进程
    SocketAsyncEventArgs sendArg = new SocketAsyncEventArgs();//专为发送消息新建一个SocketAsyncEventArgs
    sendArg.Completed += new EventHandler<SocketAsyncEventArgs>(OnSend);//加入发送事件          
    sendArg.AcceptSocket = e.AcceptSocket;//此处的AcceptSocket参数仅用于传递发送Socket
    sendArg.UserToken = 1; //UserToken专为用户存放数据,这里存放发送次数
    StartSend(sendArg);

    e.AcceptSocket = null;//为重复利用e,必须使用此句代码
    Console.WriteLine("进入下一个等待周期");
    StartAccept(e);//进入下一个侦听周期,注意,参数e是上一个StartAccept传递过来的
}
//开始读取消息,将ReceiveAsync包装在此方法内
static void StartSend(SocketAsyncEventArgs e)
{
    int i = (int)e.UserToken;
    if (i > 5)
    {
        return;
    }
    byte[] sendBuff = Encoding.Unicode.GetBytes($"服务器消息:{e.UserToken}");//发送缓存
    e.SetBuffer(sendBuff, 0, sendBuff.Length);//设置发送缓存
    e.UserToken = i + 1;
    if (!e.AcceptSocket.SendAsync(e))//返回true,则会触发OnSend事件进行异步读取
    {   //返回false则同步发送
        OnSend(null, e);
    }
}
//收到消息后触发的事件
static void OnSend(object sender, SocketAsyncEventArgs e)
{
    Socket s = e.AcceptSocket;
    if (e.SocketError == SocketError.Success)
    {
        Console.WriteLine($"成功向{s.RemoteEndPoint.ToString()}发送信息");
        Thread.Sleep(500);
        StartSend(e);//进入下一个发送周期
    }
    else
    {
        s.Shutdown(SocketShutdown.Send);
        s.Close();
    }
}

服务器每收到一个连接,便向此连接客户端每隔0.5秒发一条信息,共发5条。

发送流程和读取基本一样,UserToken属性专门用于存放用户数据。坑爹的是微软示例里它是AsyncUserToken,但此类型已被废弃,示例却不改。搞到我到处查资料查不到,费了一翻周折。现在UserTokenObject类型,随便你设计放什么都可以。只是用起来需要类型转换,有点麻烦。

客户端程序:

命名空间:

using System;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading.Tasks;

程序代码:

static void Main(string[] args)
{
    IPAddress ip = IPAddress.Parse("127.0.0.1");
    try
    {
        for (int i = 0; i < 5; i++)
        {
            Socket s = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
            s.Connect(ip, 5000); //向服务器发起连接
            Console.WriteLine("开始连接服务器 {0} ...", ip.ToString());
            if (s.Connected)
            {
                Console.WriteLine("连接成功!");
            }
            Task.Run(() => ReceiveMessage(s));
        }
    }
    catch (Exception e)
    {
        Console.WriteLine(e.Message);
    }
    Console.ReadLine();
}
static void ReceiveMessage(Socket recvSocket)
{
    string ip = recvSocket.RemoteEndPoint.ToString();
    byte[] buff = new byte[1024]; //创建一个接收缓冲区
    try
    {
        while (true)
        {
            int count = recvSocket.Receive(buff, buff.Length, SocketFlags.None);
            string recvStr = Encoding.Unicode.GetString(buff, 0, count);
            Console.WriteLine("接收到来自{0}数据:{1}", ip, recvStr);
        }
    }
    catch (Exception e)
    {
        Console.WriteLine(e.Message);
    }
    finally
    {
        recvSocket.Close();//客户端关闭时会引发异常,此时关闭此连接
        Console.WriteLine("{0} 已退出连接。", ip);
    }
}

客户端开了5条连接。然后等待服务器发信息。

运行结果:

写这程序调试了挺久,得出一个血的教训,不要去尝试手动关闭发送 Socket,会导致可怕的结果。大家可以试试,将服务器代码中的StartSend方法代码更改如下:

static void StartSend(SocketAsyncEventArgs e)
{
    int i = (int)e.UserToken;
    if (i > 5)
    {
        e.AcceptSocket.Close();
        return;
    }
    byte[] sendBuff = Encoding.Unicode.GetBytes($"服务器消息:{e.UserToken}");//发送缓存
    e.SetBuffer(sendBuff, 0, sendBuff.Length);//设置发送缓存
    e.UserToken = i + 1;
    if (!e.AcceptSocket.SendAsync(e))//返回true,则会触发OnSend事件进行异步读取
    {   //返回false则同步发送
        OnSend(null, e);
    }
}

其实,正确的用法是接收和发送共用一个SocketAsyncEventArgs,一般不会发生我这个程序里发生的问题。需要注意的是,千万不要象这篇文章一样去使用SocketAsyncEventArgs,正确的使用方法并不是这样的。我这样写只是为了让大家先更容易地理解它的机制,然后再深入学习。下一篇文章介绍正确的使用方法。